Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats#21732
Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats#21732gaurav-amz wants to merge 10 commits into
Conversation
PR Reviewer Guide 🔍(Review updated until commit b8f7374)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to b8f7374 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit d96e97c
Suggestions up to commit 79ca7c2
Suggestions up to commit d4df93c
Suggestions up to commit 3ab722f
Suggestions up to commit f4cbdd2
|
|
❌ Gradle check result for 9188043: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
9188043 to
d419b2a
Compare
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit d646dd4.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
d419b2a to
10f3617
Compare
|
Persistent review updated to latest commit 498b502 |
|
❌ Gradle check result for 498b502: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
| public interface NativeAllocatorListener { | ||
|
|
||
| /** | ||
| * Invoked after a pool's limit has been updated. | ||
| * | ||
| * @param poolName logical pool name | ||
| * @param newLimit the new limit in bytes | ||
| */ | ||
| void onPoolLimitChanged(String poolName, long newLimit); | ||
| } |
There was a problem hiding this comment.
Can we not use ClusterSetting's update consumer instead.
There was a problem hiding this comment.
After the FlightTransport / AnalyticsSearchService / DefaultPlanExecutor migration, the SPI had zero production callers.
Bukhtawar
left a comment
There was a problem hiding this comment.
We might need to reject the request, throw OpenSearchRejectedException rather than oom-ing. Also lets try to see how we can wire up circuit-breaker for this. Maybe circuit-breaker stats is something we leverage for tracking memory used
| // and capped by the framework alongside ingest, query, and datafusion. Hard-fail if | ||
| // the framework plugin is missing — silently falling back to a separate root would | ||
| // break the same-root invariant for cross-plugin Arrow handoff. | ||
| ArrowNativeAllocator nativeAllocator = ArrowNativeAllocator.instance(); |
There was a problem hiding this comment.
Please change the way of providing ArrowNativeAllocator.
See ArrowAllocatorService as the example.
Initialize:
And return in createComponents, so will be part of GUICE
How plugin get it:
From plugin's createComponents
Or through GUICE
And we can remove the ArrowAllocatorService using this PR, as it's for providing the root allocator to other extension plugin. And ArrowNativeAllocator should be the one doing that now, through this getPoolAllocator.
There was a problem hiding this comment.
Thanks for point this out, will do this change.
|
This PR seems to be the good place to get some alignment on the memory assignment across different components in the system. I think the Java Arrow memory are mostly used for intermediate data transfer, while Rust memory are used for query execution. That's one reason to provide more to Rust side. |
|
Persistent review updated to latest commit d646dd4 |
|
❌ Gradle check result for d646dd4: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
d646dd4 to
53c0e36
Compare
|
Persistent review updated to latest commit 53c0e36 |
|
Persistent review updated to latest commit e1f3c5b |
|
❌ Gradle check result for e1f3c5b: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
e1f3c5b to
041dc0e
Compare
|
Persistent review updated to latest commit 041dc0e |
|
❌ Gradle check result for 041dc0e: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
bowenlan-amzn
left a comment
There was a problem hiding this comment.
Overral looks good! Thanks for refactoring the allocator related changes in arrow-base.
Left some comments that should be easy to accommodate. Approving now.
| * | ||
| * <p>Default: empty. | ||
| */ | ||
| public List<PluginNodeStats> nodeStats() { |
There was a problem hiding this comment.
I hope we can have a ExperimentalApi annotation here.
| IndexSettings indexSettings, | ||
| ThreadPool threadPool | ||
| ThreadPool threadPool, | ||
| org.opensearch.arrow.allocator.ArrowNativeAllocator nativeAllocator |
There was a problem hiding this comment.
nit: use import. And all the other places
| BufferAllocator flightPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_FLIGHT); | ||
| flightAllocator = flightPool.newChildAllocator("flight", 0, flightPool.getLimit()); |
There was a problem hiding this comment.
It seems not necessary to create flightAllocator here, the flightPool should be the flightAllocator.
For the 2 child allocator, server and client, can we set the limit to be Long.MAX_VALUE. I think they will still be bound by their parent which is the POOL_FLIGHT here.
This way seems we also don't need the listener setup here.
There was a problem hiding this comment.
Done. The intermediate flightAllocator is gone, flightPool is the parent directly, and serverAllocator / clientAllocator are children at Long.MAX_VALUE.
| BufferAllocator queryPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY); | ||
| this.allocator = queryPool.newChildAllocator("analytics-search-service", 0, queryPool.getLimit()); |
There was a problem hiding this comment.
Same as this comment in FlightTransport https://github.com/opensearch-project/OpenSearch/pull/21732/changes#r3269631251
There was a problem hiding this comment.
Same shape applied.
| BufferAllocator queryPool = nativeAllocator.getPoolAllocator(NativeAllocatorPoolConfig.POOL_QUERY); | ||
| this.coordinatorAllocator = queryPool.newChildAllocator("coordinator", 0, queryPool.getLimit()); |
There was a problem hiding this comment.
Same as this comment in FlightTransport https://github.com/opensearch-project/OpenSearch/pull/21732/changes#r3269631251
|
Persistent review updated to latest commit 40688ad |
|
❌ Gradle check result for 40688ad: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
… plugin nodeStats — review-feedback edition Builds on opensearch-project#21703 to add the query, datafusion with allocator, dynamic-tuning, stats, and pool-wiring pieces. This commit is the consolidated version after addressing review feedback on PR opensearch-project#21732. ## Architecture: three native-memory trackers, three knobs Each tracker owns the bytes it can actually see, with a process-level cap above all of them. On a 64 GB / 16 GB-heap node the defaults compose to: RAM 64 GB JVM heap (operator-configured -Xmx) 16 GB Off-heap (RAM - heap) 48 GB node.native_memory.limit (79% of off-heap) 37.92 GB ← AC throttle threshold native.allocator.root.limit (20% of NM) 7.58 GB ← Arrow framework cap pool.flight.max (5% of NM) 1.90 GB ← Flight transport pool pool.ingest.max (8% of NM) 3.03 GB ← Parquet VSR pool pool.query.max (5% of NM) 1.90 GB ← analytics-engine pool datafusion.memory_pool_limit (75% of NM) 28.44 GB ← Rust runtime pool (sibling to Arrow) Unmanaged 10.08 GB ← Lucene mmap, OS page cache, etc. Independent (disk budget): datafusion.spill_memory_limit_bytes (50% of physical RAM) 32 GB ## What this commit does ### 1. Dynamic settings + cluster-update consumers All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time with HTTP 400. ### 2. Query and DataFusion pools POOL_QUERY added for analytics-engine per-query allocators (children of nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting (datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives entirely on the Rust side and is reported only through DataFusion's MemoryPool — a Java-side pool that pretended to track it would either need a per-allocation FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with no observable effect). POOL_DATAFUSION is intentionally absent; the Rust-side datafusion.memory_pool_limit_bytes is the honest knob for that layer. ### 3. Plugin#nodeStats SPI New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats serializes plugin-contributed payloads via NamedWriteable, surfacing them under _nodes/stats. Annotated @experimentalapi to match PluginNodeStats. ### 4. Pool wiring through Guice (no static singleton) ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and returned to Node.java which auto-binds it via Guice. Consumers (AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through constructor @Inject — no static instance() / INSTANCE / ensureForTesting. The createComponents body is extracted into a package-private buildAllocator helper so unit tests can exercise the full pool/consumer wiring without the heavyweight ClusterService fixture. ### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170, @Bukhtawar #3267186318] The original design used NativeAllocatorListener to push pool-limit changes into consumer-side child allocators that captured the pool's limit at construction time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines 191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every allocation and checks the parent's allocationLimit — so dynamic resizes of parquet.native.pool.{flight,query}.max take effect immediately on subsequent allocations through any descendant. The listener was emulating this Arrow-native behavior and is no longer needed. - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE - FlightTransport: drop intermediate flightAllocator + poolListener; pool is the parent, server/client are Long.MAX_VALUE children - libs/arrow-spi/.../NativeAllocatorListener.java: deleted - NativeAllocator interface: addListener / removeListener removed - ArrowNativeAllocator: listeners field, fireListeners method, and listener calls in setPoolLimit/setPoolMin/rebalance removed The rebalancer continues to function as before: it changes pool limits via setLimit, which children read via Arrow's parent-cap check at allocateBytes. Rebalancer is off by default. ### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java line 1748 already binds every component returned from createComponents: pluginComponents.stream() .forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); ArrowBasePlugin#createComponents already returns the allocator, so the explicit bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules registered the same binding a second time and caused Guice CreationException at cluster startup. This was the root cause of the gradle-check Jenkins CI failures on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test failed during MockNode construction. ### 7. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833] Annotate the new SPI hook to match the @experimentalapi annotation already present on PluginNodeStats (the type the method returns). ### 8. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758] Replace 3 fully-qualified org.opensearch.arrow.allocator.ArrowNativeAllocator references in constructor parameters with a single import. ### 9. Memory defaults aligned to the partitioning model - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup- aware: container memory limit on K8s/Docker, total physical RAM on bare metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the pre-default opt-in semantics. The OsProbe-reading entry point delegates to a pure overload deriveNativeMemoryLimitDefault(long, long) so unit tests cover the fallback branches deterministically. - native.allocator.root.limit defaults to 20% × node.native_memory.limit. - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside the framework cap). - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit (called out by @bharath-techie #3271093086: prior default Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap runtime). - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM, independent of node.native_memory.limit (spill is a disk-staging budget, not a memory budget). Behavior change to call out in upgrade notes: admission control is now active by default. Operators wanting pre-existing opt-out behavior can set node.native_memory.limit: 0b — all framework caps then fall back to Long.MAX_VALUE unbounded mode. ### 10. Translate Arrow OutOfMemoryException to OpenSearchRejectedExecutionException at per-request boundaries [reviewer feedback] Added an AllocationRejection helper in plugins/arrow-base that wraps Arrow OOM with OpenSearch's standard backpressure signal so the REST layer maps these failures to HTTP 429 (Too Many Requests) instead of a generic 500. Wraps applied at the per-request allocation boundaries: - DefaultPlanExecutor.executeInternal: per-query child allocator creation - VSRPool.tryRotate: VSR rotation during ingest Lifetime/startup allocations (FlightTransport.doStart, AnalyticsSearchService constructor, VSRPool.initializeActiveVSR) are intentionally left unwrapped — a failure there is a framework misconfiguration, not a per-request backpressure signal. ### 11. Integration tests for cap-enforcement boundaries Added NativeAllocatorBoundaryIT under plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise real Arrow allocations via the Guice-injected ArrowNativeAllocator: - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a 8 MiB request through the pool throws OutOfMemoryException. - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a 16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even though QUERY's own max would individually allow it. - testDynamicPoolResizeAffectsInFlightAllocations: create child at Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern post-listener-removal), verify a 2 MiB allocation succeeds, PUT query.max=1 MiB via cluster settings, verify a 2 MiB request now throws OOM via Arrow's parent-cap check at allocateBytes. These cover the boundaries the framework is supposed to enforce. The third test specifically verifies the listener-replacement contract. ### 12. Unit tests for codecov patch-coverage gates Added unit tests covering paths flagged by codecov as uncovered diff lines: - ArrowBasePluginTests: * extended testPoolMaxRejectsNegative to cover INGEST_MAX and QUERY_MAX rejection branches. * testBuildAllocatorWiresAllPoolsAndSettingsConsumers exercises the full createComponents code path via the extracted buildAllocator helper: all three pools registered, pool maxes match operator-set values, cluster-settings update consumers wired. - NodeStatsTests: * testPluginStatsDropsAllEntriesWhenReceiverHasNoRegistry covers the registry-null branch in NodeStats.readPluginStats (defensive path when the parent stream has no NamedWriteableRegistry attached). * testPluginNodeStatsDefaultReturnsEmpty covers the Plugin#nodeStats() default empty-list return. - NodeResourceUsageTrackerTests: * testDeriveNativeMemoryLimitDefaultFallbackPaths covers the ResourceTrackerSettings.deriveNativeMemoryLimitDefault fallback branches (probe-returns-0, heap >= ram) plus the happy path. - AllocationRejectionTests: * 4 tests covering success pass-through, OOM translation, non-OOM propagation, and the Runnable overload of AllocationRejection.wrap. ## How a query flows through these layers Take a concrete example: a user issues a PPL query that goes through analytics-engine and dispatches to DataFusion. 1. Coordinator receives the request. → Admission control checks node.native_memory.limit budget. → If OK, proceeds; otherwise rejects with 429. 2. AnalyticsSearchService creates a per-fragment Arrow allocator: allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE) → Future BufferAllocator.buffer(...) calls on this allocator increment POOL_QUERY's counter and the root counter via Arrow's parent-cap chain. → Bounded by parquet.native.pool.query.max. 3. AnalyticsSearchService dispatches to DataFusion via NativeBridge. → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM. 4. DataFusion runs the query in Rust: → HashAggregate builds a hash table. → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB. → Bounded by datafusion.memory_pool_limit_bytes. → If exceeded, HashAggregate spills to disk (which itself is bounded by datafusion.spill_memory_limit_bytes). 5. DataFusion produces a result batch in Rust. → Java imports it via Arrow C Data Interface. → The import allocates Java ArrowBufs under the per-fragment allocator from step 2. POOL_QUERY counter += result_size. 6. Result returns to coordinator. → Per-fragment allocator closes; POOL_QUERY counter decrements. → DataFusion query completes; MemoryPool counter decrements. Each layer accounts for what it owns. No double-counting. Each operator-tunable knob bounds a real, observable thing. ## Verification ./gradlew -Dsandbox.enabled=true \ :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \ :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \ :server:test --tests "org.opensearch.node.resource.tracker.*" \ :server:test --tests "org.opensearch.action.admin.cluster.node.stats.NodeStatsTests" \ :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*" \ spotlessJavaCheck → BUILD SUCCESSFUL By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
7abcfce to
6d3436c
Compare
|
Persistent review updated to latest commit 6d3436c |
|
Persistent review updated to latest commit f4cbdd2 |
|
❌ Gradle check result for f4cbdd2: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
…Stats Builds on opensearch-project#21703 to add the pieces a real production deployment needs: * Make pool min/max settings dynamic and grouped-validate cross-setting invariants on every cluster-state update (sum of pool mins <= root, per-pool min <= max). FLIGHT_MIN/INGEST_MIN defaults are 0L (the prior Long.MAX_VALUE defaults caused the new validator to reject any non-MAX root). setPoolMin now updates the live BufferAllocator.setLimit so the Dynamic property has observable effect even when the rebalancer is off. * Derive ROOT_LIMIT_SETTING default from the AC node-native-memory budget (limit minus buffer-percent) so the framework respects the same budget AC throttles on, with a Long.MAX_VALUE fallback when AC is unconfigured. * Add QUERY pool alongside FLIGHT and INGEST. Pools init at min when the rebalancer is enabled; otherwise at max so non-rebalanced nodes can still allocate. * Wire arrow-flight-rpc to the FLIGHT pool, parquet-data-format to the INGEST pool, and analytics-engine to the QUERY pool via the framework's allocator service. Hard-fail if the framework plugin is missing — silently skipping the wire-up is the silent-misconfiguration class of bug we want to prevent. * Cleanup ad-hoc allocator fallbacks in parquet-data-format / analytics-engine so all Arrow consumers go through the unified pool hierarchy. * Rebalancer now distributes headroom across all pools (not only those with current allocation > 0). Avoids the dead-pool corner case where a pool with min=0 starts at limit=0, can never make a first allocation, and never receives a bonus. DataFusion runtime memory accounting stays separate. The Rust-side DataFusion MemoryPool is governed by datafusion.memory_pool_limit_bytes (unchanged from before), which is the right knob: DataFusion's internal sort/hash/group-by working memory is allocated by Rust, not through the Arrow Java BufferAllocator hierarchy, so a Java-side pool would resize a ceiling no allocator routes through. The framework's QUERY pool covers the cross-plugin Arrow allocations the analytics-engine plumbing makes, which is what we want bounded centrally. Plugin _nodes/stats integration: * Add Plugin#nodeStats() hook + PluginNodeStats interface in server. * Wire NodeStats to carry Map<String, PluginNodeStats> with version-gated ser/deser at V_3_7_0 and top-level rendering under nodes.<id>.<name>. * Each entry is wire-framed as (name, length-prefixed bytes); the receiver wraps the inner payload with NamedWriteableAwareStreamInput and drops entries whose subtype is not registered locally. This makes mixed-version rolling upgrades safe — a coordinator that lacks the plugin a data node is running keeps decoding the rest of NodeStats instead of failing the whole response. * NativeAllocatorPluginStats adapter wraps NativeAllocatorPoolStats so the framework contributes to _nodes/stats. The dedicated _native_allocator/stats REST endpoint is gone — one observability surface, not two. * Plugin stats are emitted on every _nodes/stats request regardless of the ?metric= filter; matches RemoteStoreNodeStats precedent. DataFusion spill memory limit: * Promote datafusion.spill_memory_limit_bytes to Setting.Property.Dynamic. * Wire addSettingsUpdateConsumer that branches on NativeBridge.isSpillLimitDynamic(): mirrors live when df_set_spill_limit is exported, otherwise warns and waits for next node restart. API hygiene: * parquet.max_per_vsr_allocation_ratio is a divisor (limit/N), not a ratio. Renamed to parquet.max_per_vsr_allocation_divisor with a hard upper bound of 100 to reject fat-finger PUTs that would starve every VSR. Real regressions caught during self-audit: * Existing arrow-flight-rpc internalClusterTests and the sandbox coordinator ITs were not declaring the framework plugin in nodePlugins(). After the flight transport got wired to the FLIGHT pool, those ITs fail at node startup with "ArrowNativeAllocator not initialized". Each IT now installs the framework plugin and lists it as an extendedPlugin. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…gistry injection Removes ArrowNativeAllocator.instance(), the static INSTANCE field, and ensureForTesting(). Binds ArrowNativeAllocator into Guice from ArrowBasePlugin#createGuiceModules; consumer plugins (FlightStreamPlugin, AnalyticsPlugin, ParquetDataFormatPlugin) fetch it from PluginComponentRegistry and pass it through to FlightTransport, AnalyticsSearchService, ArrowBufferPool, and ParquetIndexingEngine via constructor parameters. Tests construct the allocator directly with the standard pools instead of relying on the test-only ensureForTesting() helper. Addresses bowen's review comment to align with the ArrowAllocatorService provisioning pattern (returned from createComponents, bound via Guice). Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…ocatorService Bowen's review feedback: every Arrow consumer should source its allocator from one of the framework's named pools (FLIGHT/INGEST/QUERY) via ArrowNativeAllocator#getPoolAllocator. The old ArrowAllocatorService SPI let callers create root-siblings outside any pool, which left their allocations invisible in _nodes/stats.native_allocator.pools.* and created a footgun for cross-plugin handoffs. Migrations: * DefaultPlanExecutor.coordinatorAllocator -> child of POOL_QUERY (coordinator-side bytes now count against parquet.native.pool.query.max alongside the data-node-side per-fragment allocators) * TransportNativeArrowStreamDataAction (stream-transport-example) -> POOL_FLIGHT * NativeArrowTransportIT.TransportTestArrowAction -> POOL_FLIGHT Deletions: * ArrowAllocatorService interface and DefaultArrowAllocatorService impl * Guice binding bind(ArrowAllocatorService.class) in ArrowBasePlugin * Dead allocatorService constructor parameter on FlightTransport and AnalyticsSearchService * Now-empty org.opensearch.arrow.memory package (package-info.java) ArrowBatchResponse Javadoc updated to point readers at the named-pool API. Behavior change: on a node serving as both coordinator and data node for an analytics query, both sides of the query now count against parquet.native.pool.query.max. Operators upgrading should review the existing budget against total per-node analytics workload. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
Self-review surfaced six items worth fixing before pushing: * DefaultPlanExecutor now subscribes to the QUERY pool listener and updates coordinatorAllocator on dynamic resize, matching the pattern used by AnalyticsSearchService and FlightTransport. Without this, runtime grows of parquet.native.pool.query.max never reached the coordinator-side allocator. * DefaultPlanExecutor.coordinatorAllocator is now sized at queryPool.getLimit() instead of Long.MAX_VALUE so the literal matches the actual cap. * Drop the dead try/catch around removeListener in FlightTransport.stopInternal and AnalyticsSearchService.close. Those catches were holdovers from the singleton era and can't fire now that nativeAllocator is constructor-injected. * Fix the joined-import line in AnalyticsSearchService that two earlier patches concatenated. * Expand ArrowBatchResponse Javadoc with a pool-selection decision tree (FLIGHT for transport, INGEST for ingest path, QUERY for query execution) so readers know which pool to pick. * Update misleading Javadoc in TransportNativeArrowStreamDataAction and DefaultPlanExecutor that referenced non-existent close paths. The pre-existing leak is preserved (out of scope for this PR) but documented honestly with a path forward. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
… plugin nodeStats — review-feedback edition Builds on opensearch-project#21703 to add the query, datafusion with allocator, dynamic-tuning, stats, and pool-wiring pieces. This commit is the consolidated version after addressing review feedback on PR opensearch-project#21732. ## Architecture: three native-memory trackers, three knobs Each tracker owns the bytes it can actually see, with a process-level cap above all of them. On a 64 GB / 16 GB-heap node the defaults compose to: RAM 64 GB JVM heap (operator-configured -Xmx) 16 GB Off-heap (RAM - heap) 48 GB node.native_memory.limit (79% of off-heap) 37.92 GB ← AC throttle threshold native.allocator.root.limit (20% of NM) 7.58 GB ← Arrow framework cap pool.flight.max (5% of NM) 1.90 GB ← Flight transport pool pool.ingest.max (8% of NM) 3.03 GB ← Parquet VSR pool pool.query.max (5% of NM) 1.90 GB ← analytics-engine pool datafusion.memory_pool_limit (75% of NM) 28.44 GB ← Rust runtime pool (sibling to Arrow) Unmanaged 10.08 GB ← Lucene mmap, OS page cache, etc. Independent (disk budget): datafusion.spill_memory_limit_bytes (50% of physical RAM) 32 GB ## What this commit does ### 1. Dynamic settings + cluster-update consumers All native-allocator settings are Setting.Property.Dynamic. Cluster-settings update consumers in ArrowBasePlugin#registerSettingsUpdateConsumers wire PUTs to live ArrowNativeAllocator state. The grouped validator (validateUpdate) rejects cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time with HTTP 400. ### 2. Query and DataFusion pools POOL_QUERY added for analytics-engine per-query allocators (children of nativeAllocator.getPoolAllocator(POOL_QUERY)). DataFusion gets its own setting (datafusion.memory_pool_limit_bytes) since DataFusion's working memory lives entirely on the Rust side and is reported only through DataFusion's MemoryPool — a Java-side pool that pretended to track it would either need a per-allocation FFM round-trip (performance disaster) or be a config-only mirror (HTTP 200 with no observable effect). POOL_DATAFUSION is intentionally absent; the Rust-side datafusion.memory_pool_limit_bytes is the honest knob for that layer. ### 3. Plugin#nodeStats SPI New SPI method Plugin#nodeStats() returning List<PluginNodeStats>. NodeStats serializes plugin-contributed payloads via NamedWriteable, surfacing them under _nodes/stats. Annotated @experimentalapi to match PluginNodeStats. ### 4. Pool wiring through Guice (no static singleton) ArrowNativeAllocator is constructed in ArrowBasePlugin#createComponents and returned to Node.java which auto-binds it via Guice. Consumers (AnalyticsSearchService, FlightTransport, DefaultPlanExecutor) receive it through constructor @Inject — no static instance() / INSTANCE / ensureForTesting. The createComponents body is extracted into a package-private buildAllocator helper so unit tests can exercise the full pool/consumer wiring without the heavyweight ClusterService fixture. ### 5. Drop NativeAllocatorListener SPI [@bowenlan-amzn #3269655712, #3269660170, @Bukhtawar #3267186318] The original design used NativeAllocatorListener to push pool-limit changes into consumer-side child allocators that captured the pool's limit at construction time. With children created at Long.MAX_VALUE, Arrow's Accountant.allocate (lines 191-203 of Accountant.java in arrow-java v18.3.0) walks the parent chain on every allocation and checks the parent's allocationLimit — so dynamic resizes of parquet.native.pool.{flight,query}.max take effect immediately on subsequent allocations through any descendant. The listener was emulating this Arrow-native behavior and is no longer needed. - AnalyticsSearchService: drop poolListener; child uses Long.MAX_VALUE - DefaultPlanExecutor: drop poolListener; coordinator uses Long.MAX_VALUE - FlightTransport: drop intermediate flightAllocator + poolListener; pool is the parent, server/client are Long.MAX_VALUE children - libs/arrow-spi/.../NativeAllocatorListener.java: deleted - NativeAllocator interface: addListener / removeListener removed - ArrowNativeAllocator: listeners field, fireListeners method, and listener calls in setPoolLimit/setPoolMin/rebalance removed The rebalancer continues to function as before: it changes pool limits via setLimit, which children read via Arrow's parent-cap check at allocateBytes. Rebalancer is off by default. ### 6. Fix Guice duplicate-binding regression on ArrowNativeAllocator Removed the redundant ArrowBasePlugin#createGuiceModules override. Node.java line 1748 already binds every component returned from createComponents: pluginComponents.stream() .forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); ArrowBasePlugin#createComponents already returns the allocator, so the explicit bind(ArrowNativeAllocator.class).toInstance(allocator) in createGuiceModules registered the same binding a second time and caused Guice CreationException at cluster startup. This was the root cause of the gradle-check Jenkins CI failures on commits 9188043, 498b502, d646dd4, e1f3c5b, 041dc0e — every integration test failed during MockNode construction. ### 7. @experimentalapi on Plugin#nodeStats() [@bowenlan-amzn #3269563833] Annotate the new SPI hook to match the @experimentalapi annotation already present on PluginNodeStats (the type the method returns). ### 8. FQN -> import nit in ParquetIndexingEngine [@bowenlan-amzn #3269589758] Replace 3 fully-qualified org.opensearch.arrow.allocator.ArrowNativeAllocator references in constructor parameters with a single import. ### 9. Memory defaults aligned to the partitioning model - node.native_memory.limit defaults to 79% × (RAM - heap) via OsProbe (cgroup- aware: container memory limit on K8s/Docker, total physical RAM on bare metal). Falls back to ZERO if probe fails or heap >= RAM, preserving the pre-default opt-in semantics. The OsProbe-reading entry point delegates to a pure overload deriveNativeMemoryLimitDefault(long, long) so unit tests cover the fallback branches deterministically. - native.allocator.root.limit defaults to 20% × node.native_memory.limit. - parquet.native.pool.{flight,ingest,query}.max default to 5%/8%/5% of node.native_memory.limit (sum 18% < root's 20%, leaving 2 pp headroom inside the framework cap). - datafusion.memory_pool_limit_bytes defaults to 75% × node.native_memory.limit (called out by @bharath-techie #3271093086: prior default Runtime.maxMemory() / 4 was JVM-heap derived, wrong baseline for an off-heap runtime). - datafusion.spill_memory_limit_bytes defaults to 50% × physical RAM, independent of node.native_memory.limit (spill is a disk-staging budget, not a memory budget). Behavior change to call out in upgrade notes: admission control is now active by default. Operators wanting pre-existing opt-out behavior can set node.native_memory.limit: 0b — all framework caps then fall back to Long.MAX_VALUE unbounded mode. ### 10. Translate Arrow OutOfMemoryException to OpenSearchRejectedExecutionException at per-request boundaries [reviewer feedback] Added an AllocationRejection helper in plugins/arrow-base that wraps Arrow OOM with OpenSearch's standard backpressure signal so the REST layer maps these failures to HTTP 429 (Too Many Requests) instead of a generic 500. Wraps applied at the per-request allocation boundaries: - DefaultPlanExecutor.executeInternal: per-query child allocator creation - VSRPool.tryRotate: VSR rotation during ingest Lifetime/startup allocations (FlightTransport.doStart, AnalyticsSearchService constructor, VSRPool.initializeActiveVSR) are intentionally left unwrapped — a failure there is a framework misconfiguration, not a per-request backpressure signal. ### 11. Integration tests for cap-enforcement boundaries Added NativeAllocatorBoundaryIT under plugins/arrow-flight-rpc/src/internalClusterTest. Three tests boot a single-node cluster with tight memory settings (root=16 MiB, pool maxes=16 MiB) and exercise real Arrow allocations via the Guice-injected ArrowNativeAllocator: - testPoolMaxRejectsAllocationsBeyondCap: PUT pool.query.max=4 MiB, verify a 8 MiB request through the pool throws OutOfMemoryException. - testRootLimitRejectsAllocationsBeyondCap: hold 8 MiB in FLIGHT, verify a 16 MiB QUERY request fails at the root level (8+16 > 16 root cap) even though QUERY's own max would individually allow it. - testDynamicPoolResizeAffectsInFlightAllocations: create child at Long.MAX_VALUE (the AnalyticsSearchService / DefaultPlanExecutor pattern post-listener-removal), verify a 2 MiB allocation succeeds, PUT query.max=1 MiB via cluster settings, verify a 2 MiB request now throws OOM via Arrow's parent-cap check at allocateBytes. These cover the boundaries the framework is supposed to enforce. The third test specifically verifies the listener-replacement contract. ### 12. Unit tests for codecov patch-coverage gates Added unit tests covering paths flagged by codecov as uncovered diff lines: - ArrowBasePluginTests: * extended testPoolMaxRejectsNegative to cover INGEST_MAX and QUERY_MAX rejection branches. * testBuildAllocatorWiresAllPoolsAndSettingsConsumers exercises the full createComponents code path via the extracted buildAllocator helper: all three pools registered, pool maxes match operator-set values, cluster-settings update consumers wired. - NodeStatsTests: * testPluginStatsDropsAllEntriesWhenReceiverHasNoRegistry covers the registry-null branch in NodeStats.readPluginStats (defensive path when the parent stream has no NamedWriteableRegistry attached). * testPluginNodeStatsDefaultReturnsEmpty covers the Plugin#nodeStats() default empty-list return. - NodeResourceUsageTrackerTests: * testDeriveNativeMemoryLimitDefaultFallbackPaths covers the ResourceTrackerSettings.deriveNativeMemoryLimitDefault fallback branches (probe-returns-0, heap >= ram) plus the happy path. - AllocationRejectionTests: * 4 tests covering success pass-through, OOM translation, non-OOM propagation, and the Runnable overload of AllocationRejection.wrap. ## How a query flows through these layers Take a concrete example: a user issues a PPL query that goes through analytics-engine and dispatches to DataFusion. 1. Coordinator receives the request. → Admission control checks node.native_memory.limit budget. → If OK, proceeds; otherwise rejects with 429. 2. AnalyticsSearchService creates a per-fragment Arrow allocator: allocator = getPoolAllocator(POOL_QUERY).newChildAllocator("frag-N", 0, Long.MAX_VALUE) → Future BufferAllocator.buffer(...) calls on this allocator increment POOL_QUERY's counter and the root counter via Arrow's parent-cap chain. → Bounded by parquet.native.pool.query.max. 3. AnalyticsSearchService dispatches to DataFusion via NativeBridge. → NativeBridge.executeQueryAsync(...) marshals plan bytes via FFM. 4. DataFusion runs the query in Rust: → HashAggregate builds a hash table. → reservation.try_grow(50MB) → DataFusion MemoryPool counter += 50MB. → Bounded by datafusion.memory_pool_limit_bytes. → If exceeded, HashAggregate spills to disk (which itself is bounded by datafusion.spill_memory_limit_bytes). 5. DataFusion produces a result batch in Rust. → Java imports it via Arrow C Data Interface. → The import allocates Java ArrowBufs under the per-fragment allocator from step 2. POOL_QUERY counter += result_size. 6. Result returns to coordinator. → Per-fragment allocator closes; POOL_QUERY counter decrements. → DataFusion query completes; MemoryPool counter decrements. Each layer accounts for what it owns. No double-counting. Each operator-tunable knob bounds a real, observable thing. ## Verification ./gradlew -Dsandbox.enabled=true \ :plugins:arrow-base:test :plugins:arrow-flight-rpc:test \ :sandbox:plugins:{analytics-engine,analytics-backend-datafusion,parquet-data-format}:test \ :server:test --tests "org.opensearch.node.resource.tracker.*" \ :server:test --tests "org.opensearch.action.admin.cluster.node.stats.NodeStatsTests" \ :plugins:arrow-flight-rpc:internalClusterTest --tests "*NativeAllocatorBoundaryIT*" \ spotlessJavaCheck → BUILD SUCCESSFUL By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
…load
The Runnable overload of AllocationRejection.wrap was missing @param tags
for context and body, failing the missingJavadoc gradle-check on CI:
AllocationRejection.java:75: error: AllocationRejection.wrap (method):
missing javadoc @param for parameter 'context'
Mirror the @param/@throws structure already present on the Supplier
overload above.
Signed-off-by: Gaurav Singh <snghsvn@amazon.com>
|
Persistent review updated to latest commit a63141a |
|
Persistent review updated to latest commit 3ab722f |
OpenSearchRejectedException handling has been addressed in this PR. Circuit breaker-related changes will be included in a follow-up PR. Stats-related refactoring will be needed once Ajay’s PR is merged. |
| public static final Setting<Integer> MAX_PER_VSR_ALLOCATION_DIVISOR = Setting.intSetting( | ||
| "parquet.max_per_vsr_allocation_divisor", | ||
| 10, | ||
| 1, | ||
| 100, | ||
| Setting.Property.NodeScope, | ||
| Setting.Property.Dynamic | ||
| ); |
There was a problem hiding this comment.
Should this be function of cores?
| private static Map<String, PluginNodeStats> readPluginStats(StreamInput in) throws IOException { | ||
| int size = in.readVInt(); | ||
| if (size == 0) { | ||
| return Collections.emptyMap(); | ||
| } | ||
| NamedWriteableRegistry registry = in.namedWriteableRegistry(); | ||
| Map<String, PluginNodeStats> result = new HashMap<>(size); | ||
| for (int i = 0; i < size; i++) { | ||
| String name = in.readString(); | ||
| BytesReference payload = in.readBytesReference(); | ||
| if (registry == null) { | ||
| // No registry attached to the parent stream — we can't deserialize any | ||
| // entry. Drop the whole map. This branch is defensive; production paths | ||
| // always set a registry when they expect named writeables. | ||
| continue; | ||
| } | ||
| try ( | ||
| StreamInput rawIn = payload.streamInput(); | ||
| NamedWriteableAwareStreamInput payloadIn = new NamedWriteableAwareStreamInput(rawIn, registry) | ||
| ) { | ||
| payloadIn.setVersion(in.getVersion()); | ||
| PluginNodeStats stats = payloadIn.readNamedWriteable(PluginNodeStats.class); |
There was a problem hiding this comment.
Will _node/stats?metrics=<metric_name> work?
There was a problem hiding this comment.
Yes, The stats will be under this path _nodes/stats/native_allocator
…lowup # Conflicts: # sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java # server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java # server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java # server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java # server/src/main/java/org/opensearch/node/NodeService.java # test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java
3ab722f to
d4df93c
Compare
|
Persistent review updated to latest commit d4df93c |
|
Persistent review updated to latest commit 79ca7c2 |
|
Persistent review updated to latest commit d96e97c |
|
Persistent review updated to latest commit b8f7374 |
Description
Builds on #21703 to add the query, datafusion with allocator, dynamic-tuning, stats, and pool-wiring pieces.
Architecture: three native-memory trackers, three knobs
Each tracker owns the bytes it can actually see, with a process-level cap above all of them.
Three layers, three responsibilities. Each is necessary; none can be replaced by another.
ArrowNativeAllocator) accounts for every ArrowBufferAllocatorallocation in the JVM. Partitioned into FLIGHT / INGEST / QUERY pools so one plugin can't starve another. All Arrow buffers descend from the sameRootAllocator, preserving Arrow's same-root invariant for cross-plugin zero-copy handoff.MemoryPool) accounts for DataFusion's own working memory and triggers spill / fail-fast when a query exceeds budget. Lives entirely on the Rust side; updates flow in via FFM throughNativeBridge.setMemoryPoolLimit.node.native_memory.limitis the operator-declared off-heap budget AC throttles against; framework-derived defaults (root, pools, DataFusion pool) all scale from this single number.POOL_DATAFUSIONis intentionally not present. DataFusion's working memory is allocated by Rust operators directly and reported only to DataFusion's ownMemoryPool— it never flows through Arrow'sBufferAllocatorAPI. Adding a Java-side pool that pretended to track it would have required either a per-allocation FFM round-trip (performance disaster) or a config-only mirror (a setting that returns HTTP 200 and silently does nothing). The Rust-sidedatafusion.memory_pool_limit_bytesis the honest knob for that layer.Worked example: 64 GB / 16 GB-heap node
With operator-declared
-Xmx16gon a 64 GB host (bare metal or cgroup-limited container), defaults compose to:As a flat table for quick reference:
Independent budget (disk staging, not memory):
How a query flows through these layers
Take a concrete example: a user issues a PPL query that goes through analytics-engine and dispatches to DataFusion.
Each layer accounts for what it owns. No double-counting (the result bytes are counted in DataFusion's pool only while they exist as Rust buffers; after Java imports them, ownership transfers and the Java side counts them; when Java closes the import, the bytes are freed). Each operator-tunable knob bounds a real, observable thing.
Operator surface
After this PR, an operator inspecting a node running the query above sees:
{ "native_allocator": { "root": {"allocated": "150MB", "limit": "7.58GB"}, "pools": { "flight": {"allocated": "20MB", "limit": "1.90GB"}, "ingest": {"allocated": "30MB", "limit": "3.03GB"}, "query": {"allocated": "100MB", "limit": "1.90GB"} } }, "datafusion": { "memory_pool": {"usage": "2.4GB", "limit": "28.44GB"}, "spill": {"usage": "0", "limit": "32GB"} } }Three numbers, three sources, three knobs. The operator looks at each in isolation when tuning that layer:
datafusion.memory_pool_limit_bytes.parquet.native.pool.flight.maxor lowerparquet.native.pool.ingest.max.node.native_memory.limit.All pool min/max settings are
Setting.Property.Dynamic. Pool limit changes propagate to consumer-side child allocators automatically via Arrow's parent-cap check atallocateBytes— child allocators are created withLong.MAX_VALUEand inherit the live parent cap on every allocation, so dynamic resizes reach in-flight workloads without restart and without an explicit notification SPI. The grouped validator rejects cross-setting violations (sum of pool mins > root, per-pool min > max) at PUT time with HTTP 400 rather than at the next allocation.Behavior change to call out
Admission control is now active by default.
node.native_memory.limitdefaults to 79% × (RAM − JVM heap) instead of0(unconfigured). On upgrade, AC will start tracking native memory utilization and the framework's pool caps will derive sensible values from the operator's declared off-heap budget without any explicit configuration.Operators who want pre-existing opt-out behavior (AC unconfigured, all framework caps unbounded) can set:
This restores the prior "explicit-opt-in" semantics — useful for nodes where Lucene mmap or non-Arrow native consumers dominate and the operator does not want admission control throttling search/index traffic.
Changes after initial review
NativeAllocatorListenerSPI. The SPI was emulating Arrow's native parent-cap check atallocateBytes. With consumer-side child allocators created atLong.MAX_VALUE, dynamic pool resizes propagate automatically through Arrow'sAccountant.allocateparent walk — no listener machinery needed. Resolves @bowenlan-amzn (Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats #21732 (comment), Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats #21732 (comment)) and @Bukhtawar (Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats #21732 (comment)).ArrowBasePlugin#createGuiceModulesoverride;Node.javaalready auto-binds every component returned fromcreateComponents. This was the root cause of thegradle-checkJenkins CI failures across multiple SHAs.@ExperimentalApionPlugin#nodeStats()to match the annotation already onPluginNodeStats. Resolves @bowenlan-amzn (Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats #21732 (comment)).ParquetIndexingEngine(3 constructor parameters). Resolves @bowenlan-amzn (Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats #21732 (comment)).node.native_memory.limitderives from RAM−heap (79%);root.limitis 20% of NM (Arrow gets a small fraction; DataFusion gets the larger 75% as a sibling); pool maxes anchor to NM at 5%/8%/5%;datafusion.memory_pool_limit_bytesis 75% of NM (replaces the prior JVM-heap-derived default flagged by @bharath-techie at Native allocator: dynamic settings, query/datafusion pools, plugin nodeStats #21732 (comment));datafusion.spill_memory_limit_bytesis 50% of physical RAM (independent disk-staging budget).NativeAllocatorBoundaryIT(plugins/arrow-flight-rpc/src/internalClusterTest) boot a real cluster with tight memory settings and verify cap enforcement end-to-end through actual Arrow allocations: per-pool max rejection, root-level rejection across pools, and dynamic pool-resize propagation to in-flightLong.MAX_VALUEchildren.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.